{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 02: WordCount - A First Look at WordCount\n",
    "\n",
    "We'll use this popular algorithm to learn some idioms for working with data using Spark's [RDD](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) API, the original API. \n",
    "\n",
    "The classic, simple *Word Count* algorithm is easy to understand and it's suitable for parallel computation, so it's a good vehicle when first learning a Big Data API.\n",
    "\n",
    "In *Word Count*, you read a corpus of documents, tokenize each one into words, and count the occurrences of all the words globally."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The corresponding Spark program is [WordCount2.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/WordCount2.scala). It shows you how to structure a Spark program, including imports and one way to construct the required [SparkContext](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll use the KJV Bible text again. Subsequent exercises will add the ability to specify different input sources using command-line arguments.\n",
    "\n",
    "Let's work through the code. Recall that the notebook environment already created a `SparkContext` instance for us, named `sc`. Like in the Introduction, we read the text file and convert the lines to lower case."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "input = MapPartitionsRDD[2] at map at <console>:27\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[2] at map at <console>:27"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val input = sc.textFile(\"../data/kjvdat.txt\").map(line => line.toLowerCase)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It would be nice if the output showed the full type information. In the Scala interpreter used by `spark-shell`, you would see this output:\n",
    "\n",
    "```scala\n",
    "input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:27\n",
    "```\n",
    "The right-hand side of the `=` is a little confusing, since Scala uses `[...]` for type parameters, but Spark's `MapPartitionsRDD.toString` method returns the class name and `[n]`, where the `n` is an index used internally.\n",
    "\n",
    "Hence, the `input` _reference_ is of type `RDD[String]`, mean the \"records\" have a single \"column\" of type `String`. The actual type of the object is `MapPartitionsRDD[String]`.\n",
    "\n",
    "So, where useful, I'll use a comment to show you the type of an expression."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you will read `input` several times, cache the data so Spark doesn't reread from disk each time!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[2] at map at <console>:27"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "input.cache"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following is one long statement. Take the `input` and:\n",
    "\n",
    "1. Split each line on non-alphanumeric characters (a crude form of tokenization). `flatMap` \"flattens\" each array returned into a since `RDD` of words\n",
    "2. Map over each work and return a tuple with the word and a \"seed\" count of 1.\n",
    "3. Do the equivalent of a SQL `GROUPBY`, with each word as the key (the first tuple element), then \"collapse\" the groups by summing the counts, return `(word, N)` tuples.\n",
    "\n",
    "The comments show the type returned by that part of the long expression."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "wc = ShuffledRDD[5] at reduceByKey at <console>:32\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "ShuffledRDD[5] at reduceByKey at <console>:32"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val wc = input\n",
    "  .flatMap(line => line.split(\"\"\"[^\\p{IsAlphabetic}]+\"\"\"))  // RDD[String]\n",
    "  .map(word => (word, 1))                                   // RDD[Tuple2[String,Int]], a.k.a. RDD[(String,Int)]\n",
    "  .reduceByKey((count1, count2) => count1 + count2)         // RDD[(String,Int)]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Why do I have to name each `count*` variable? A popular shorthand is to use the `_` placeholder for each function argument, converting the last line to `reduceByKey(_ + _)`. It's worth getting used to this idiom; you'll see it a lot!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, save the results.\n",
    "\n",
    "> **Note:** If you run the next cell more than once, _delete the output directory first!_ Spark, following Hadoop conventions, won't overwrite an existing directory."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Writing output to: notebooks/output/kjv-wc2\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "Name: org.apache.hadoop.mapred.FileAlreadyExistsException\n",
       "Message: Output directory file:/home/jovyan/notebooks/output/kjv-wc2 already exists\n",
       "StackTrace:   at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)\n",
       "  at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:283)\n",
       "  at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)\n",
       "  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n",
       "  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n",
       "  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)\n",
       "  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n",
       "  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n",
       "  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)\n",
       "  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n",
       "  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n",
       "  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n",
       "  at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)\n",
       "  at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1493)\n",
       "  at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1472)\n",
       "  at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1472)\n",
       "  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n",
       "  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n",
       "  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n",
       "  at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1472)"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "println(\"Writing output to: notebooks/output/kjv-wc2\")\n",
    "wc.saveAsTextFile(\"output/kjv-wc2\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Recap\n",
    "\n",
    "We loaded and cache the data, althought caching isn't really useful this time, since we will make a single pass through the data. I left this statement here just to remind you of this feature.\n",
    "\n",
    "Now we setup a pipeline of operations to perform the word count.\n",
    "\n",
    "First, the line was split into words using as the separator any run of characters that isn't alphabetic, e.g., digits, whitespace, and punctuation. Note that using the regex `\"\\\\W+\"` doesn't work well for non-UTF8 character sets! This also conveniently removes the trailing `~` characters at the end of each line that exist in the file for some reason.\n",
    "\n",
    "With this regex, `input.flatMap(line => line.split(...))` maps over each line, expanding it into a collection of words, yielding a collection of collections of words. The `flat` part flattens those nested collections into a single, \"flat\" collection of words.\n",
    "\n",
    "The next two lines convert the single word \"records\" into tuples with the word and a count of `1`. In Spark, the first field in a tuple will be used as the default key for joins, group-bys, and the `reduceByKey` we use next.\n",
    "\n",
    "The `reduceByKey` step effectively groups all the tuples together with the same word (the key) and then \"reduces\" the values using the passed in function. In this case, the two counts are added together. Hence, we get two-element *records* with unique words and their counts.\n",
    "\n",
    "Finally, we invoke `saveAsTextFile` to write the final RDD to the output location.\n",
    "\n",
    "Note that the input and output locations will be relative to the local file system, when running in local mode, and relative to the user's home directory in HDFS (e.g., `/user/$USER`), when a program runs with HDFS (e.g., in Hadoop, Mesos, etc.).\n",
    "\n",
    "Spark also follows another Hadoop convention for file I/O; the `out` path is actually interpreted as a directory name. It will contain the same `_SUCCESS` and `part-00000` files discussed previously. In a real cluster with lots of data and lots of concurrent tasks, there would be many `part-NNNNN` files."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "#### WordCount2 Exercises\n",
    "\n",
    "At the end of each example source file, you'll find exercises you can try. Solutions for some of them are implemented in the `solns` package. For example, [solns/WordCount2GroupBy.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/solns/WordCount2GroupBy.scala) solves the \"group by\" exercise described in `WordCount2.scala`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exercises"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 1: Understand the output\n",
    "\n",
    "If you look at the (unsorted) data, you'll find a lot of entries where the word is a number. (Try searching the input text file to find them.) Are there really that many numbers in the bible? If not, where did the numbers come from? Look at the original file for clues. Hint, consider the file format is the following:  `book|chapter#|verse#|verse_text`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 2: Use other versions of the Bible and related texts\n",
    "\n",
    "The data directory contains similar files for the Tanach (`t3utf.dat` - in Hebrew), the Latin Vulgate (`vuldat.txt`), the Septuagint (`sept.txt` - Greek).\n",
    "  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 3: Sort the output\n",
    "\n",
    "See the Scaladoc page for [OrderedRDDFunctions](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions). Sort the output by word, try both ascending and descending. Note this can be expensive for large data sets! (Why??)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 4: Count the number of words ...\n",
    "\n",
    "Take the output from the previous exercise and count the number of words that start with each letter of the alphabet and each digit. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 5 (Hard): Sort the output by count\n",
    "\n",
    "You can't use the same approach as in the previous exercise. Hint: See [RDD.keyBy](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD). What's the most frequent word that isn't a \"stop word\" (too-common words like \"the\", \"he\", \"it\", \"a\", ...)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 6 (Hard): Group the word-count pairs by count\n",
    "\n",
    "In other words, group together all pairs where the count is 1 (i.e., just one occurrence of those words was found), all pairs where the count is 2, is 3, etc. Sort ascending or descending. Hint: Is there a method for grouping?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 7 (Thought Experiment)\n",
    "\n",
    "Consider the size of each group created in the previous exercise and the distribution of those sizes vs. counts.\n",
    "\n",
    "What characteristics would you expect for this distribution? That is, which words (or kinds of words) would you expect to occur most frequently? What kind of _distribution_ fits the counts?"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
